package com.hanxiaozhang.io.makereactor_5;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2021/8/25
 * @since 1.0.0
 */
public class SelectorThreadGroup {

    private SelectorThread[] sts = null;
    private ServerSocketChannel server = null;
    private AtomicInteger xid = new AtomicInteger(0);

    private SelectorThreadGroup worker = this;

    /**
     * @param num 线程数
     */
    public SelectorThreadGroup(int num) {
        sts = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            sts[i] = new SelectorThread(this);
            Thread thread = new Thread(sts[i]);
            thread.start();
        }
    }


    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));

            // 注册到那个selector上？
            nextSelector(server);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 无论 是ServerSocket socket 都复用这个方法
     * 0上固定注册一个listen
     * 1和2注册客户端
     *
     * @param server
     */
    public void nextSelector(Channel server) {
        try {
            if (server instanceof ServerSocketChannel) {
                // listen 选择了 boss组中的一个线程后，要更新这个线程的work组
                SelectorThread st = next();
                st.lbq.put(server);
                st.setWorker(worker);
                st.selector.wakeup();
                // System.out.println("nextSelector--ServerSocketChannel")
            } else {
                // 在 main线程种，取到堆里的selectorThread对象
                SelectorThread st = next_worker();
                //1. 通过队列传递数据消息
                st.lbq.put(server);
                //2. 通过打断阻塞，让对应的线程去自己在打断后完成注册selector
                st.selector.wakeup();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    private SelectorThread next() {
        // 轮询，容易发生倾斜
        int index = xid.incrementAndGet() % sts.length;
        return sts[index];
    }

    private SelectorThread next_worker() {
        // 动用worker的线程分配
        int index = xid.incrementAndGet() % worker.sts.length;
        return worker.sts[index];
    }

    public void setWorker(SelectorThreadGroup worker) {
        this.worker = worker;
    }
}
